/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/01/08 10:12
 * Description: 采用Structured Streaming进行数据采集
 * 放弃使用类映射的方式生成schema
 * 转为使用withColumn和select方式
 * 将一列数据分割成为多列
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.DataFilterAndOperatorUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple2;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

import static org.apache.spark.sql.functions.*;


public class StructuredStreamingKafkaSql {

	public static void main(String[] args){

		SparkSession spark = SparkSession
				.builder()
				.appName("KafkaJoinRedisApp")
				.config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
				.config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
				.getOrCreate();

		Dataset<Row> kafkaDataset = spark
				.readStream()
				.format("kafka")
				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
				.option("subscribe", "stopic")
				.option("startingOffsets",ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
				.load();

		kafkaDataset.printSchema();
		spark.read()
				.format("org.apache.spark.sql.redis")
				.option("table", "Xdata")
				.load()
				.registerTempTable("redis");

		Dataset<Row> lineData = kafkaDataset
				.select(col("value").cast("string"),col("timestamp"))
				.withColumn("tmp", split(col("value"),ColumnsUtil.getRegex()))
				.select(ColumnsUtil.getInstance().combineColumns(ColumnsUtil.getInstance().getColumns("tmp"),
						col("timestamp")))
				.drop(col("tmp"));

		List<String> sqlList = Arrays.asList(
				"select * from kafka where url !='null'",
				"select phone,timestamp from kafka where phone in (select key from redis)"
		);
		for (String sqlStr:sqlList){
			lineData.registerTempTable("kafka");
			lineData = spark.sql(sqlStr);
		}
		//30s内手机号出现的次数
		Dataset<Row> windowedCounts = lineData
				.groupBy(window(col("timestamp"),"30 seconds"), col("phone"))
				.count();


		//输出到控制台
//		StreamingQuery query = windowedCounts
//				//.join(redisDataset,expr("phone = key"))
//				//.select("phone","count")
//				//.select(col("phone"),col("userAgent"),col("URL"),col("timestamp"))
//				.select(concat_ws("|",col("phone"),col("count")).cast("string").as("value"))
//				.writeStream()
//				.format("console")
//				//.outputMode(OutputMode.Append())
//				//.outputMode(OutputMode.Complete())
//				.outputMode(OutputMode.Update())
//				.option("truncate", "false")
//				.start();

		StreamingQuery query = windowedCounts
				.select(concat_ws("|",col("phone"),col("count")).cast("string").as("value"))
				.writeStream()
				.format("kafka")
				.outputMode(OutputMode.Update())
				.option("checkpointLocation","hdfs://192.168.70.21:9000/test_szj/checkpoint")
				.option("kafka.bootstrap.servers",ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
				.option("topic", "savetopic")
				.start();
		//保持程序运行
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}
